Elasticsearch篇之分布式特性

Elasticsearch篇之分布式特性

  • es支持集群模式, 是一个分布式系统, 其好处主要有两个:
    • 增大系统容量, 如内存, 磁盘, 使得es集群可以支持PB级的数据
    • 提高系统可用性, 即使部分节点停止服务, 整个集群依然可以正常服务
  • es集群由多个es实例组成
    • 不同集群通过集群名字来区分, 可以通过 cluster.name 进行修改, 默认为elasticsearch
    • 每个es实例本质上是一个JVM进程, 且有自己的名字, 通过 node.name 进行修改

cerebro安装与运行

  • cerebro是使用Scala,Play Framework,AngularJS和Bootstrap构建的开源(MIT许可)elasticsearch Web管理工具。

  • github地址: https://github.com/lmenezes/cerebro

  • 安装与运行

    # 解压
    [jlc@bogon es]$ tar -zxf cerebro-0.7.2.tgz
    
    # 运行
    [jlc@bogon cerebro-0.7.2]$ bin/cerebro
    [info] play.api.Play - Application started (Prod)
    [info] p.c.s.NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
  • 访问cerebro

    1. 在浏览器地址栏中输入http://127.0.0.1:9000/
    2020-04-11_015107
    1. 在Node address输入es实例的地址, 如本演示输入: ``http://127.0.0.1:9200`, 点击connect.

    2. 接着就进入cerebro的界面, 如下图所示:

      2020-04-11_015402

构建集群

启动一个节点

  • 运行如下命令可以启动一个es节点实例

    • bin/elasticsearch -Ecluster.name=my_cluster -Epath.data=my_cluster_node1 -Enode.name=node1 -Ehttp.port=5200

    • 在cerebro中输入http://127.0.0.1:5200 连接此节点, 显示的node节点信息如下:

      2020-04-11_020749

Cluster State

  • es集群相关的数据称为cluster state, 主要记录以下信息:

    • 节点信息: 比如节点名称, 连接地址等

    • 索引信息: 比如索引名称, 配置等

    • ……

      2020-04-11_022606

Master Node

  • 可以修改cluster state的节点称为 master 节点, 一个集群只能有一个

  • cluster stats 存储在每个节点上, master维护最新版本并同步给其他节点

  • master节点是通过集群中所有节点选举产生的, 可以被选举的节点称为master-eligible节点, 相关配置如下:

    • node.master: true # 使此节点成为master-eligible节点
    2020-04-11_023133

创建一个索引

  • 我们通过如下api创建一个索引

    • PUT test_index

      2020-04-11_023412

    • 创建索引后, cerebro界面信息如下

      2020-04-11_023655

Coordinating Node

  • 处理请求的节点即为 Coordinating 节点, 该节点为所有节点的默认角色, 不能取消

    • 该节点负责路由请求到正确的节点处理, 比如创建索引的请求到master节点 (因为只有master节点能够修改cluster state信息)
    2020-04-11_024155

Data Node

  • 存储数据的节点称为data节点, 默认节点都是data类型, 相关配置如下:

    • node.data: true
    2020-04-11_024503

单点问题

  • 如果node1停止服务, 则集群停止服务

    2020-04-11_024704
  • 解决方案: 新增一个节点

    2020-04-11_025116
    • 运行如下命令可以启动一个节点实例

      • bin/elasticsearch -Ecluster.name=my_cluster -Epath.data=my_cluster_node2 -Enode.name=node2 -Ehttp.port=6200
    • 运行后, cerebro显示集群状态信息如下所示

      2020-04-11_025302

提高系统可用性

副本与分片

  • 服务可用性
    • 2个节点情况下, 允许其中一个节点停止服务
  • 数据可用性
    • 引入副本 (Replication) 解决
    • 每个节点上都有完备的数据

副本

  • 如下图所示, node2上是test_index的副本

    2020-04-11_030350

分片

  • 引入分片是为了增大系统容量

  • 如何将数据分布于所有节点上?

    • 引入分片 (Shard) 解决问题
  • 分片是es支持PB级数据的基石

    • 分片存储了部分数据, 可以分布于任意节点上
    • 分片数在索引创建时指定且后续不允许再更改, 默认为5个
    • 分片有主分片和副本分片之分, 以实现数据的高可用
    • 副本分片的数据由主分片同步, 可以有多个, 从而提高读取的吞吐量
  • 下图演示的是3个节点的集群中test_index的分片分布情况, 创建时我们制定了3个分片和一个副本, api如下所示:

    2020-04-11_031143

副本与分片演示

  • 我们将要构建一个如下图所示的es集群

    2020-04-11_031217
  1. 启动三个es节点实例, 一个作为master节点, 另外两个作为data节点

    # node1
    [jlc@localhost elasticsearch-6.1.1]$ bin/elasticsearch -Ecluster.name=my_cluster -Epath.data=my_cluster_node1 -Enode.name=node1 -Ehttp.port=5200
    
    # node2
    [jlc@localhost elasticsearch-6.1.1]$ bin/elasticsearch -Ecluster.name=my_cluster -Epath.data=my_cluster_node2 -Enode.name=node2 -Ehttp.port=6200
    
    # node3
    [jlc@localhost elasticsearch-6.1.1]$ bin/elasticsearch -Ecluster.name=my_cluster -Epath.data=my_cluster_node3 -Enode.name=node3 -Ehttp.port=7200
  2. 创建test_shard_replic索引, 并设置副本为1, 分片数为3 (其中在cerebro中创建方式如下)

    2020-04-11_032540

  3. 创建完毕后, cerebro显示的集群状态如下图所示

    2020-04-11_032728

    注: 如果磁盘空间太小, es默认不能分配, 需要修改如下配置进行分配

    2020-04-11_033146

两个问题

  • 提出两个问题的当前es集群状态如下:

    2020-04-11_033853

  1. 此时增加节点是否能够提高test_index的数据容量?

    • 不能, 因为只有3个分片, 已经分布在3台节点上, 新增的节点无法利用
    • 新增一个节点后, 集群状态如下

    2020-04-11_034135

  2. 此时增加副本数是否能够提高test_index的读取吞吐量?

    • 不能, 因为新增的副本也是分布在这3个节点上, 还是利用了同样的资源, 如果要增加吞吐量, 还需要新增节点.
    • 新增副本数, 集群状态如下

    2020-04-11_034444

  • 分片数的设定很重要, 需要提前规划好
    • 过小导致后续无法通过增加节点实现水平扩容
    • 过大会导致一个节点上分布过多的分片, 造成资源浪费, 同时会影响查询性能

Cluster Health

  • 通过如下api 可以查看集群状况, 包括以下三种:

    • green 健康状态, 值所有的主副分片都正常分配
    • yellow 指所有的主分片都正常分配, 但是有副本分片未正常分配
    • red 有主分片未分配
    # request
    GET /_cluster/health
    
    # response
    {
      "cluster_name": "my_cluster",
      "status": "green",
      "timed_out": false,
      "number_of_nodes": 3,
      "number_of_data_nodes": 3,
      "active_primary_shards": 3,
      "active_shards": 6,
      "relocating_shards": 0,
      "initializing_shards": 0,
      "unassigned_shards": 0,
      "delayed_unassigned_shards": 0,
      "number_of_pending_tasks": 0,
      "number_of_in_flight_fetch": 0,
      "task_max_waiting_in_queue_millis": 0,
      "active_shards_percent_as_number": 100
    }

故障转移 (Failover)

  • 集群由3个节点组成, 如下所示, 此时集群状态是green

    2020-04-11_041054

  • node1所在机器宕机导致服务终止, 此时集群会如何处理?

    1. node2和node3发现node1无法响应一段时间后会发起master选举, 比如这里选择node2为master节点. 此时由于主分片PO下线, 集群状态变为Red

      2020-04-11_041349

    2. node2发现主分片P0未分配, 将R0提升为主分片. 此时由于所有主分片都正常分配, 集群变为yellow

      2020-04-11_041541

    3. node2为P0和P1生成新的副本, 集群状态变为green

      2020-04-11_041656

文档分布式存储

  • 当用户发起一个PUT请求, 文档最终会存储在分片上, 如下图所示:

    • 假设Document1 最终存储在分片P1上

    2020-04-11_043043

  • Document1是如何存储到分片P1的? 选择P1的依据是什么?

    • 需要文档到分片的映射算法
    • 目的
      • 使得文档均匀分布在所有的分片上, 以充分利用资源
    • 算法
      • 随机选择或者round-robin算法?
        • 不可取, 因为需要维护文档到分片的映射关系, 成本巨大
      • 根据文档实时计算对应的分片才是可取之道

文档到分片的映射算法

  • es通过如下的公式计算文档对应的分片
    • shard = hash(routing) % number_of_primary_shards
    • hash算法保证可以将数据均匀的分散在分片中
    • routing是一个关键参数, 默认是文档的id, 也可以自行制定
    • number_of_primary_shards是主分片数
  • 该算法与主分片数相关, 这也是分片数一旦确定后便不能更改的原因

文档创建流程

  1. Client向node3发起创建文档的请求
  2. node3通过routing计算该文档应该存储到Shard1上, 查询cluster state 后确认主分片P1在node2上, 然后转发创建文档的请求到node2上
  3. P1接收并执行创建文档请求后, 将同样的请求发送到副本分片R1
  4. R1接收并执行创建文档请求后, 通知P1成功的结果
  5. P1接收副本分片结果后, 通知node3创建成功
  6. node3返回结果到Client

2020-04-11_044328

文档读取流程

  1. Client向node3发起获取文档1的请求
  2. node3通过routing计算该文档在Shard1上, 查询cluster state后获取Shard1的主副分片列表, 然后以轮训的机制获取一个shard, 比如这里是R1, 然后转发读取文档的请求到node1
  3. R1接收并执行读取文档请求后, 将结果返回node3
  4. node3返回结果给Client

2020-04-11_044744

文档批量创建的流程

  1. Client向node3发起批量创建文档的请求(bulk)
  2. node3通过routing计算所有文档对应的shard, 然后按照主shard分配对应执行的操作, 同时发送请求到涉及的主shard, 比如这里3个主shard都需要参与
  3. 主shard接收并执行请求后, 将同样的请求同步到对应的副本shard
  4. 副本shard执行结果后返回结果到主shard, 主shard再返回node3
  5. node3整合结果后返回Client

2020-04-11_045350

文档批量读取的流程

  1. Client向node3发起批量获取文档的请求(mget)
  2. node3通过routing计算所有文档对应的shard, 然后轮训的机制获取需要参与的shard, 按照shard构建mget请求同时发送请求到设计的shard, 比如这里有两个shard需要参与
  3. R1, R2返回文档结果给node3
  4. node3返回结果给Client

2020-04-11_045822

脑裂问题

  • 脑裂问题, 英文名为split-brain, 是分布式系统中的经典网络问题, 如下图所示:

    • 3个节点组成的集群, 突然node1的网络和其他两个节点中断

      node2与node3会重新选举master, 比如node2成为了新的master, 此时会更新cluster state

      node1自己组成集群后, 也会更新cluster state

      同一个集群有两个master, 而且维护不同的cluster state, 网络恢复后无法选择正确的master

    2020-04-11_050815

  • 解决方案: 仅在可选举master-eligible节点数大于等于quorum时才可以进行master选举

    • quorum = master-eligible节点数/2 + 1, 例如3个master-eligible节点时, quorum为2.
    • 设定discovery.zen.minimum_master_nodes 为quorum即可避免脑裂

    2020-04-11_051200

Shard详解

倒排索引的不可变更

  • 倒排索引一旦生成, 不能更改

    • 其好处如下:
      • 不用考虑并发写文件的考虑, 杜绝了锁机制带来的性能问题
      • 由于文件不再更改, 可以充分利用文件系统缓存, 只需载入一次, 只要内存足够, 对该文件的读取都会从内存读取, 性能高
      • 利于生成缓存数据
      • 利于对文件进行压缩存储, 节省磁盘和内存存储空间
    • 坏处为需要写入新文档时, 必须重新构建倒排索引文件, 然后替换掉老文件后, 新文档才能被检索, 导致文档实时性差, 如下图所示:

    2020-04-11_053130

    • 为此, 一个解决方案为, 生成一个新的倒排索引文件,在用户进行查询时, 同时查询这”两个”倒排索引文件, 然后结合结果返回给用户, 如下图所示:

    2020-04-11_053354

文档搜索实时性

  • Lucene便是采用了上述方案, 它构建的单个倒排索引文件称为segment, 合在一起称为Index, 与ES中的Index概念不同. ES中的一个Shard对应一个Lucene Index.
  • Lucene会有一个专门的文件来记录所有的segment信息, 称为commit point.
2020-04-11_053737

refresh

  • segment写入磁盘的过程依然很耗时, 可以借助文件系统缓存的特性, 先将segment在缓存中创建并开放查询来进一步提升实时性, 该过程在es中称为refresh.
  • 在refresh之前文档会先存储在一个buffer中, refresh时将buffer中的所有文档清空并生成segment.
  • es默认每一秒执行一次refresh, 因此文档的实时性被提高到一秒, 这也是es被称为近实时(Near Real Time)的原因.

2020-04-11_054400

translog

  • 如果在内存中的segment还没有写入磁盘前发生了宕机, n那么其中的文档就无法恢复了, 如何解决这个问题?

    • es引入了translog机制. 写入文档到buffer时, 同时将该操作写入translog.
    • translog文件会即时写入磁盘(fsync), 6.x默认每个请求都会落盘, 可以修改为每5秒写一次, 这样的风险便是丢失5秒内的数据, 相关配置为index.translog.*
    • es启动时会检查translog文件, 并从中恢复

    2020-04-11_055016

flush

  • flush负责将内存中的segment写入磁盘, 主要做如下的工作:

    • 将translog写入磁盘
    • 将index buffer清空, 其中的文档生成一个新的segment, 相当于一个refresh操作
    • 更新commit point并写入磁盘
    • 执行fsync操作, 将内存中的segment写入磁盘
    • 删除旧的translog文件

    2020-04-11_055501

refresh发生时机

  • refresh发生的时机主要有如下几种情况:
    • 间隔时间达到时, 通过index.settings.refresh_interval来设定, 默认是1秒
    • index.buffer占满时, 其大小通过indices.memory.index_buffer_size设置, 默认为jvm heap的10%, 所有shard共享
    • flush发生时也会发生refresh

flush发生时机

  • flush发生的时机主要有如下几种情况:
    • 间隔时间达到时,默认是30分钟, 5.x之前可以通过index.translog.fresh_threshold_period修改, 之后无法修改.
    • translog占满时, 其大小可以通过indices.translog.fresh_threshold_size控制, 默认为512mb, 每个index有自己的translog.

删除与更新文档

  • segment一旦生成就不能修改, 那么如果你要删除文档该如何操作?
    • Lucene专门维护一个.del的文件, 记录已经删除的文档, 注意.del上记录的是文档在Lucene内部的id
    • 在查询结果返回前会过滤掉.del中的所有文档
  • 更新文档如何进行呢?
    • 首先删除文档, 然后再创建新文档

Segment Merging

  • 随着segment的增多, 由于一次查询的segment数增多, 查询速度会变慢
  • es会定时在后台进行segment merge的操作, 减少segment的数量
  • 通过force_merge api可以手动强制做segment merge的操作

整体视角

  • Lucene Index与ES Index的术语对照如下所示:

    2020-04-11_060717


   转载规则


《Elasticsearch篇之分布式特性》 Jiavg 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录